package com.easy.mq.listener.adapter;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.easy.mq.config.consumer.AbstractConsumerConfig;
import com.easy.mq.config.consumer.KafkaConsumerConfig;
import com.easy.mq.enums.ConsumeResultStatus;
import com.easy.mq.event.MQEvent;
import com.easy.mq.listener.AbstractMessageListenerContainer;
import com.easy.mq.listener.ManagerListener;
import com.easy.mq.transition.SerializationFactory;

public class KafkaListenerAdapter<T> extends AbstractMessageListenerContainer {
	
	
	 private Map<TopicPartition, OffsetAndMetadata> offsetsMap = new ConcurrentHashMap<>();  

	private static final Logger logger = LoggerFactory.getLogger(KafkaListenerAdapter.class);

	private ManagerListener<T> managerListener;
	
	private   List<MQEvent<Object>> bufferEventList = new CopyOnWriteArrayList<>();
	
	//final ConcurrentLinkedQueue<String> subscribedTopics = new ConcurrentLinkedQueue<>();
	
	private final static int  RETRIES_NUM = 0;
	
	private long curTime = 0;
	
	public synchronized long getCurTime() {
		return curTime;
	}

	public synchronized void setCurTime(long curTime) {
		this.curTime = curTime;
	}

	private Thread thread = null;

	public KafkaListenerAdapter(ManagerListener<T> managerListener) {
		this.managerListener = managerListener;
	}

	@Override
	public void registerMessageListener(AbstractConsumerConfig consumerConfig) {
		KafkaConsumerConfig kafkaConsumerConfig = (com.easy.mq.config.consumer.KafkaConsumerConfig) consumerConfig;
		ListenerConsumer consumer = new ListenerConsumer(kafkaConsumerConfig);
		thread = new Thread(consumer);
		thread.setDaemon(true);
		thread.start();
	}

	class ListenerConsumer implements Runnable {

		private final Logger logger = LoggerFactory.getLogger(ListenerConsumer.class);

		private Timer timer = new Timer("ListenerConsumer-timer", true);
		
		private ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);

		private  ListenerInvoker invoker;

		//private  Future<?> listenerInvokerFuture;

		private  KafkaConsumer<String, String> kafkaConsumer;
		
		private KafkaConsumerConfig consumerConfig;
		
		public  ListenerConsumer(KafkaConsumerConfig consumerConfig) {
			this.consumerConfig = consumerConfig;
		}

		@Override
		public void run() {
			kafkaConsumer = new KafkaConsumer<String, String>(getConfig());
			
			kafkaConsumer.subscribe(Arrays.asList(consumerConfig.getTopic()));
			startInvoker();
			timer.schedule(new DaemonTask(this), 10 * 1000, 10 * 1000);
			//batchTimer.schedule(new BatchTask(this), 2 * 1000, 2 * 1000);
			if(getBathNum(this) >1) {
				scheduledThreadPool.scheduleAtFixedRate(new BatchTask(this), 10,10, TimeUnit.SECONDS);
			}
		}

		private void startInvoker() {
			invoker = new ListenerInvoker(this);
			invoker.start();
			/*ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor();
			listenerInvokerFuture = fixedThreadPool.submit(this.invoker);*/
			
		}
		 private  Properties getConfig() {
		    	Properties props = new Properties();  
		    	
		    	if(isNotEmpty("bootstrap.servers")) {
		    		props.setProperty("bootstrap.servers", getValue("bootstrap.servers"));
		    	}
		    	if(isNotEmpty("key.deserializer")) {
		    		props.setProperty("key.deserializer", getValue("key.deserializer"));
		    	}
		    	if(isNotEmpty("value.deserializer")) {
		    		props.setProperty("value.deserializer", getValue("value.deserializer"));
		    	}
		    	if(isNotEmpty("group.id")) {
		    		props.setProperty("group.id", getValue("group.id"));
		    	}
		    	if(isNotEmpty("acks")) {
		    		props.setProperty("acks", getValue("acks"));
		    	}
		    	if(isNotEmpty("socket.timeout.ms")) {
		    		props.setProperty("socket.timeout.ms", getValue("socket.timeout.ms"));
		    	}
		    	if(isNotEmpty("socket.buffersize")) {
		    		props.setProperty("socket.buffersize", getValue("socket.buffersize"));
		    	}
		    	if(isNotEmpty("fetch.size")) {
		    		props.setProperty("fetch.size", getValue("fetch.size"));
		    	}
		    	if(isNotEmpty("backoff.increment.ms")) {
		    		props.setProperty("backoff.increment.ms", getValue("backoff.increment.ms"));
		    	}
		    	if(isNotEmpty("queued.max.message.chunks")) {
		    		props.setProperty("queued.max.message.chunks", getValue("queued.max.message.chunks"));
		    	}
		    	if(isNotEmpty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
		    		props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, getValue(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
		    	}
		    	if(isNotEmpty("auto.commit.interval.ms")) {
		    		props.setProperty("auto.commit.interval.ms", getValue("auto.commit.interval.ms"));
		    	}
		    	if(isNotEmpty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
		    		props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getValue(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
		    	}
		    	if(isNotEmpty("consumer.timeout.ms")) {
		    		props.setProperty("consumer.timeout.ms", getValue("consumer.timeout.ms"));
		    	}
		    /*	if(isNotEmpty("rebalance.retries.max")) {
		    		props.setProperty(ConsumerConfig.RE", getValue("rebalance.retries.max"));
		    	}
		    	*/
		    	if(isNotEmpty("consumer.id")) {
		    		props.setProperty("consumer.id", getValue("consumer.id"));
		    	}
		    	if(isNotEmpty("auto.commit.interval.ms")) {
		    		props.setProperty("auto.commit.interval.ms", getValue("auto.commit.interval.ms"));
		    	}
		    	if(isNotEmpty("queued.max.message.chunks")) {
		    		props.setProperty("queued.max.message.chunks", getValue("queued.max.message.chunks"));
		    	}
		    	if(isNotEmpty("rebalance.backoff.ms")) {
		    		props.setProperty("rebalance.backoff.ms", getValue("rebalance.backoff.ms"));
		    	}
		    	if(isNotEmpty("client.id")) {
		    		props.setProperty("client.id", getValue("client.id"));
		    	}/*else {
		    		props.setProperty("client.id", getValue("group.id"));
		    	}*/
		    	if(isNotEmpty("zookeeper.connect")) {
		    		props.setProperty("zookeeper.connect", getValue("zookeeper.connect"));
		    	}
		    	if(isNotEmpty("zookeeper.session.timeout.ms")) {
		    		props.setProperty("zookeeper.session.timeout.ms", getValue("zookeeper.session.timeout.ms"));
		    	}
		    	if(isNotEmpty("zookeeper.connection.timeout.ms")) {
		    		props.setProperty("zookeeper.connection.timeout.ms", getValue("zookeeper.connection.timeout.ms"));
		    	}
		    	if(isNotEmpty("zookeeper.sync.time.ms")) {
		    		props.setProperty("zookeeper.sync.time.ms", getValue("zookeeper.sync.time.ms"));
		    	}
		    	
		    	/*if(isNotEmpty("")) {
		    		props.setProperty("", getValue(""));
		    	}*/
		    	return props;
		    }
		 private boolean isNotEmpty(String key) {
			 if(MapUtils.getString(consumerConfig.getConfig(),key) == null) {
				 return false;
			 }
			 return StringUtils.isNotEmpty(String.valueOf(consumerConfig.getConfig().get(key)));
		}
		 
		 private String getValue(String key) {
			 return  String.valueOf(consumerConfig.getConfig().get(key));
		 }
	}
	

	class DaemonTask extends TimerTask {

		ListenerConsumer consumer;

		DaemonTask(ListenerConsumer consumer) {
			this.consumer = consumer;
		}

		@Override
		public void run() {
			if(!consumer.invoker.active){
				consumer.invoker.active = true;
			}
		}

	}
	
	class BatchTask implements Runnable {

		ListenerConsumer consumer;

		BatchTask(ListenerConsumer consumer) {
			this.consumer = consumer;
		}

		@Override
		public void run() {
			if("java.util.List".equals(consumer.consumerConfig.getParentType()) && getBathTimeout(consumer) >0 && System.currentTimeMillis() - getCurTime() >=  getBathTimeout(consumer) ) {
				consumer.invoker.notify(consumer);
			}
		}

	}

	class ListenerInvoker{

		private final CountDownLatch exitLatch = new CountDownLatch(1);

		private  boolean active = true;

		private  ListenerConsumer consumer;

		ListenerInvoker(ListenerConsumer consumer) {
			this.consumer = consumer;
		}

		public void  start() {
			try {
				while (this.active) {
					try {
						ConsumerRecords<String, String> records = consumer.kafkaConsumer.poll(consumer.consumerConfig.getTimeout());
						if (records == null || records.count() <=0) {
							continue;
						}
						
						if (!this.active) {
							if (consumer.logger.isTraceEnabled()) {
								consumer.logger.trace("No records to process");
							}
							continue;
						}
						
						for (ConsumerRecord<String, String> record : records) {
							MQEvent<Object> message = new MQEvent<Object>();
							if(logger.isDebugEnabled()) {
								logger.debug("Record.topic:{}.value:{}",consumer.consumerConfig.getTopic(),record.value());
							}
							if(record == null || StringUtils.isEmpty(record.value())) {
								continue;
							}
							if (StringUtils.isNotEmpty(consumer.consumerConfig.getParamType())) {
								try {
									if(!"java.lang.String".equals(consumer.consumerConfig.getParamType())){
										Class<?> cl = Class.forName(consumer.consumerConfig.getParamType());
										message.setContent(SerializationFactory.factory(consumer.consumerConfig.getTransfer()).deserializer(record.value(), cl));
									}else{
										message.setContent(record.value());
									}
								
								} catch (Exception e) {
									consumer.kafkaConsumer.commitSync();  
									logger.error(e.getMessage() + new String(String.valueOf(record.value())), e);
								}
							} else {
								message.setContent(record.value());
							}
							
							
							if("java.util.List".equals(consumer.consumerConfig.getParentType())) {
								if(!isValid(key(record.topic(), record.partition(), record.offset()))) {
									consumer.kafkaConsumer.commitSync();
									deleteRedisKey(key(record.topic(), record.partition(), record.offset()));
									continue;
								}
								
								bufferEventList.add(message);
								TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());  
			                    OffsetAndMetadata offset = new OffsetAndMetadata(record.offset());  
								offsetsMap.put(topicPartition, offset);
								
								if(curTime <=0) {
									setCurTime(System.currentTimeMillis());
								}
								
								if(logger.isDebugEnabled()) {
									logger.debug("BufferEventList.size:{}",bufferEventList.size());
								}
								if(bufferEventList.size() >= getBathNum(consumer)) {
									notify(consumer);
								}
							}else {
								ConsumeResultStatus status = managerListener.notifyListener(consumer.consumerConfig, message);
								if (ConsumeResultStatus.SUCCESS == status && !consumer.consumerConfig.isAutoCommit()) {
									consumer.kafkaConsumer.commitSync();
									//每次消费都需要操作缓存，会影响性能
									//deleteRedisKey(key(record.topic(), record.partition(), record.offset()));
								}else {
									if(isRetries(key(record.topic(), record.partition(), record.offset()))) {
										TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
										consumer.kafkaConsumer.seek(topicPartition, record.offset());
									}else {
										consumer.kafkaConsumer.commitSync();
										deleteRedisKey(key(record.topic(), record.partition(), record.offset()));
									}
								}
							}
						}
					} catch (Exception e) {
						consumer.kafkaConsumer.commitSync();  
						logger.error(e.getMessage());
						if (!this.active) {
							Thread.currentThread().interrupt();
						} else {
							if (consumer.logger.isDebugEnabled()) {
								consumer.logger.debug("Interrupt ignored");
							}
						}
					}
				}
			} finally {
				this.active = false;
				this.exitLatch.countDown();
				consumer.kafkaConsumer.close();
			}
		}
		
		private String key(String topic,int partition,long offset) {
			StringBuilder sb = new StringBuilder();
			sb.append(topic);
			sb.append("_");
			sb.append(partition);
			sb.append("_");
			sb.append(offset);
			return sb.toString();
		}
		
		private void deleteRedisKey(String key) {
			final int  retriesNum  = getRetriesNum();
			if(retriesNum <=0) {
				return;
			}
			consumer.consumerConfig.getCache().delete(key);
		}
		
		private boolean isRetries(String key) {
			try {
				
				
				final int  retriesNum  = getRetriesNum();
				if(retriesNum <=0) {
					return false;
				}
				final Long value = consumer.consumerConfig.getCache().inc(key,consumer.consumerConfig.getExpire());
				final boolean flag =  value <= retriesNum;
				if(flag) {
					logger.info("The message was retried {} times and the key was {}.",value,key);
					return flag;
				}
				return false;
			}catch(Exception e) {
				logger.error(e.getMessage());
				return false;
			}
			
		}
		
		private boolean isValid(String key) {
			final int  retriesNum  = getRetriesNum();
			if(retriesNum <=0) {
				return true;
			}
			Integer result = consumer.consumerConfig.getCache().get(key);			
			int value = result == null ? 0:result;
			return value < getRetriesNum();
			
		}
		
		private int getRetriesNum() {
			return consumer.consumerConfig.getConfig().get("retries.num") != null  ?  Integer.parseInt(String.valueOf(consumer.consumerConfig.getConfig().get("retries.num"))) : RETRIES_NUM;
		}
		
		private   void notify(ListenerConsumer consumer) {
			if(bufferEventList.size() <=0) {
				return;
			}
			try {
				if(logger.isDebugEnabled()) {
					logger.debug("Notify.bufferEventList.size:{}",bufferEventList.size());
				}
				ConsumeResultStatus status = managerListener.notifyListener(consumer.consumerConfig, bufferEventList);
				if (ConsumeResultStatus.SUCCESS == status && !consumer.consumerConfig.isAutoCommit()) {
					try {
						consumer.kafkaConsumer.commitSync();  						
					}catch(Exception  e) {
						logger.error(e.getMessage());
					}
				}else {
					
					for(Map.Entry<TopicPartition, OffsetAndMetadata> map : offsetsMap.entrySet()) {
						try {
							if(isRetries(key(map.getKey().topic(), map.getKey().partition(), map.getValue().offset()))) {
								consumer.kafkaConsumer.seek(map.getKey(), map.getValue().offset());
							}
						}catch(Exception  e) {
							logger.error(e.getMessage());
						}
					}
					
				}
			}finally {
				clear();
			}
		}
	}
	
	
	private void clear() {
		if(logger.isDebugEnabled()) {
			logger.debug("Notify.clear:{}",bufferEventList.size());
		}
		bufferEventList.clear();
		offsetsMap.clear();
		setCurTime(System.currentTimeMillis());
	}
	
	
	private long getBathTimeout(ListenerConsumer consumer) {
		return MapUtils.getIntValue(consumer.consumerConfig.getConfig(),"batch.timeout");
	}
	
	private int getBathNum(ListenerConsumer consumer) {
		return MapUtils.getIntValue(consumer.consumerConfig.getConfig(),"batch.num");
	}

}
